﻿using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using IOP.Extension.Cache;
using IOP.Models.Message.MQTT;
using IOP.Models.Tree.BinaryTree;
using IOP.Pulsar.MQTT.Abstractions;

namespace IOP.Pulsar.MQTT
{
    /// <summary>
    /// 主题中心
    /// </summary>
    public class TopicCenter : ITopicCenter
    {
        /// <summary>
        /// 主题
        /// </summary>
        private ConcurrentRBTree<string, Topic> Topies = new ConcurrentRBTree<string, Topic>();

        /// <summary>
        /// 包缓存
        /// </summary>
        public ConcurrentDictionary<ushort, CachePublishPackage> CachePackages { get; } = new ConcurrentDictionary<ushort, CachePublishPackage>();

        /// <summary>
        /// 随机产生器
        /// </summary>
        private readonly Random Random = new Random();
        /// <summary>
        /// 缓存
        /// </summary>
        private readonly ICacheService _Cache;
        /// <summary>
        /// 锁
        /// </summary>
        private readonly object SyncRoot = new object();

        /// <summary>
        /// 构造函数
        /// </summary>
        public TopicCenter(ICacheService cache)
        {
            Topies.Put("/", null);
            _Cache = cache;
            Init();
        }

        /// <summary>
        /// 获取或者创建主题
        /// </summary>
        /// <param name="topicName"></param>
        /// <returns></returns>
        public Topic GetOrCreateTopic(string topicName)
        {
            if (Topies.Contains(topicName, out Topic topic))
                return topic;
            else
            {
                lock (SyncRoot)
                {
                    Topic newTopic = new Topic(topicName);
                    Topies.Put(topicName, newTopic);
                    _Cache.TryGetValue(MQTTConstant.CacheTopices, out List<string> topices);
                    if (topices == null) topices = new List<string>();
                    topices.Add(topicName);
                    _Cache.Set(MQTTConstant.CacheTopices, topices);
                    newTopic.OnCacheChanged += Topic_OnCacheChanged;
                    return newTopic;
                }
            }
        }

        /// <summary>
        /// 获取多个主题
        /// </summary>
        /// <param name="filter"></param>
        /// <returns></returns>
        public IEnumerable<Topic> GetTopics(string filter)
        {
            var result = new List<Topic>();
            foreach(var item in Topies)
            {
                if (item.Key.MatchWildcard(filter))
                    result.Add(item.Value);
            }
            return result;
        }

        /// <summary>
        /// 创建一个新的报文标识符
        /// </summary>
        /// <param name="oldValue"></param>
        /// <returns></returns>
        public ushort CreatePacketIdentifier(ushort oldValue = 0)
        {
            if (CachePackages.ContainsKey(oldValue) || oldValue == 0)
            {
                oldValue = (ushort)Random.Next(1, ushort.MaxValue);
                CreatePacketIdentifier(oldValue);
            }
            return oldValue;
        }

        /// <summary>
        /// 初始化主题中心
        /// </summary>
        private void Init()
        {
            _Cache.TryGetValue(MQTTConstant.CacheTopices, out List<string> topices);
            if (topices != null)
            {
                foreach (var item in topices)
                {
                    Topic topic = new Topic(item);
                    _Cache.TryGetValue($"{MQTTConstant.TopicPrefix}{item}", out List<CachePackage> cacheData);
                    if (cacheData != null)
                        topic.Datas = new ConcurrentBag<CachePackage>(cacheData);
                    topic.OnCacheChanged += Topic_OnCacheChanged;
                    Topies.Put(item, topic);
                }
            }
        }

        /// <summary>
        /// 当主题的缓存数据发生变更时
        /// </summary>
        /// <param name="topicName">主题名</param>
        /// <param name="data">数据包</param>
        private void Topic_OnCacheChanged(string topicName,ConcurrentBag<CachePackage> data)
        {
            lock (SyncRoot)
            {
                _Cache.Set($"{MQTTConstant.TopicPrefix}{topicName}", data);
            }
        }
    }
}
